GreengrassとAWS IoTルールエンジンを使って、デバイスの接続状態をDynamoDBに保存する
サーバーレス開発部@札幌の佐藤です。
最近は、AWS IoT Greengrassを使用することが多いです。その際に、Greengrass Core デバイスの接続状況をDynamoDBに保存するユースケースがあったため、紹介します。
仮想的なIoTデバイスとしてEC2を設定
今回はテスト用として、仮想的なIoTデバイスとしてEC2を立ち上げます。
下記の記事にまとまっていますので、手順通りに進めてEC2をセットアップして、Greengrassデーモンを起動します。
Greengrass Coreデバイスの接続情報を検知する
AWS IoTではライフサイクルイベントでデバイスの接続/切断情報がMQTTトピックにパブリッシュされます。この機能を使って、IoTデバイスの接続/切断状態を検知します。
パブリッシュされるMQTTトピック
接続時
$aws/events/presence/connected/clientId
切断時
$aws/events/presence/disconnected/clientId
試してみる
- AWS IoTのテストを選択し、トピックへのサブスクリプションの欄に以下を記述し、トピックへのサブスクライブ
$aws/events/presence/#
- EC2にSSHで接続し、以下のコマンドでGreengrass Coreデーモンを開始します
cd /greengrass/ggc/core sudo ./greengrass start
- デーモンを開始すると、以下のように接続情報のトピックのペイロードがパブリッシュされます。
{ "clientId": "greengrass_connected_test_Core", "timestamp": 1558330955711, "eventType": "connected", "sessionIdentifier": "6b77c891-576f-47dd-b44c-XXXXXXXXXXXX", "principalIdentifier": "89024a381377baXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "versionNumber": 2 }
- 以下のコマンドでGreengrass Coreデーモンを停止します
cd /greengrass/ggc/core sudo ./greengrass stop
- デーモンを停止すると、以下のように切断情報のペイロードがパブリッシュされます
{ "clientId": "greengrass_connected_test_Core", "timestamp": 1558330930654, "eventType": "disconnected", "clientInitiatedDisconnect": false, "sessionIdentifier": "d2dbdeca-3cce-4e83-XXXXXXXXXXX", "principalIdentifier": "89024a381377ba91136fbaXXXXXXXXXXXXXXXXXXXXXXXX", "versionNumber": 0 }
上記のeventType
を使用して、接続情報をDynamoDBに保存していきます。
デバイスの接続情報をDynamoDBに保存する
上記のライフサイクルイベントで接続情報が検知できることがわかったので、AWS IoTのルールエンジンを使ってパブリッシュされたペイロードをLambdaを用いてDynamoDBに保存してみたいと思います。
接続情報保存用のDynamoDBとLambdaを作成
今回は、AWS SAMを使用してDynamoDB、AWS IoTルールエンジンをデプロイしていきます。以下のSAMテンプレートを作成します。今回はPython 3.7
を使用します。
SAMテンプレート
AWSTemplateFormatVersion: '2010-09-09' Transform: 'AWS::Serverless-2016-10-31' Description: 'Greengrass Connection Test' Globals: Function: Runtime: python3.7 Timeout: 10 Environment: Variables: CONNECTED_TBL: !Ref ConnectedTable Resources: ConnectedTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "id" AttributeType: "S" KeySchema: - AttributeName: "id" KeyType: "HASH" BillingMode: PAY_PER_REQUEST LambdaExecuteRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - 'lambda.amazonaws.com' Action: sts:AssumeRole Policies: - PolicyName: "allow_all" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "*" Resource: - "*" # AWS Iotルールエンジンを作成する UpdateConnected: Type: "AWS::Serverless::Function" Properties: Handler: "handler.handler" Role: !GetAtt LambdaExecuteRole.Arn Events: IoT: Type: IoTRule Properties: AwsIotSqlVersion: 2016-03-23 Sql: >- SELECT * FROM '$aws/events/presence/connected/#' UpdateDisconnected: Type: "AWS::Serverless::Function" Properties: Handler: "handler.handler" Role: !GetAtt LambdaExecuteRole.Arn Events: IoT: Type: IoTRule Properties: AwsIotSqlVersion: 2016-03-23 Sql: >- SELECT * FROM '$aws/events/presence/disconnected/#'
Lambdaハンドラ
import boto3 import logging import os region = os.getenv('AWS_DEFAULT_REGION', 'ap-northeast-1') logger = logging.getLogger() dynamodb = boto3.resource('dynamodb') def handler(event: dict, context: dict): try: # Greengrass Core からパブリッシュされたIdと接続状態を取得 client_id = event['clientId'] event_type = event['eventType'] # eventTypeの値から接続状態を取得 if event_type == 'disconnected': is_connected = False elif event_type == 'connected': is_connected = True else: return table = dynamodb.Table(os.getenv('CONNECTED_TBL')) data = table.get_item(Key={ 'id': client_id }) if 'Item' not in data: res = table.put_item( Item={ 'id': client_id, 'is_connected': is_connected }) else: res = table.update_item( Key={ 'id': client_id }, UpdateExpression='set is_connected = :i', ExpressionAttributeValues={ ':i': is_connected }, ReturnValues="UPDATED_NEW" ) return except Exception as e: logger.error(e) raise e
上記のSAMテンプレートとLambdaハンドラを同じフォルダに起き、以下のCLIコマンドを実行してデプロイします。
sam package --template-file sam.yml --output-template-file packaged.yml --s3-bucket デプロイパッケージを置くS3バケット sam deploy --template-file packaged.yml --stack-name 任意のスタック名 --capabilities CAPABILITY_IAM
動作確認
デプロイが完了したら実際に動作確認をしていきます。EC2にSSHで接続し、Greengrass Coreデーモンを起動・停止してみます。
デーモン起動
sudo /greengrass/ggc/core/greengrassd start
起動するとAWS IoTのルールエンジンを通してLambdaが起動して、 is_connected
が true
になっています。
デーモン停止
sudo /greengrass/ggc/core/greengrassd stop
停止するとAWS IoTのルールエンジンを通してLambdaが起動して、 is_connected
が false
になっています。
まとめ
AWS IoTのルールエンジンを使用して、デバイスの接続状態を保存することができました。 ルールエンジンは、Lambdaを実行する他にも、DynamoDBに直接データを保存したり、SQSキューにデータを送信したりと様々なユースケースに対応できます。